﻿using RdKafka;
using System;
using System.Text;
using System.Threading.Tasks;

namespace test.kafka
{
    public class KafkaProducer
    {
        private readonly ProducerConfig _producerConfig;
        public KafkaProducer(ProducerConfig producerConfig)
        {
            _producerConfig = producerConfig;
        }

        public async Task Send(string message) 
        {
            // Producer 接受一个或多个 BrokerList
            using (Producer producer = new Producer(_producerConfig.KafkaServer))
            //发送到一个Topic，如果没有就会创建一个
            using (Topic topic = producer.Topic(_producerConfig.Topic))
            {
                //将message转为一个 byte[]
                byte[] data = Encoding.UTF8.GetBytes(message);
                DeliveryReport deliveryReport = await topic.Produce(data);

                Console.WriteLine($"发送到分区：{deliveryReport.Partition}, Offset 为: {deliveryReport.Offset}");
            }
        }
    }
}
